package tableSink

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

object FileOutput {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    // 1.读取文件
    val filePath = "E:\\Flinklearn\\Flink\\resources\\data1\\sensor.txt"
    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temp", DataTypes.DOUBLE()))
      .createTemporaryTable("inputTable")

    // 2.转换操作
    val sensorTable: Table = tableEnv.from("inputTable")
    val resultT1: Table = sensorTable.groupBy('id).select('id, 'id.count)

    val resultT2: Table = tableEnv.sqlQuery("select id,count(id) from inputTable group by id")

    //    resultT1.toRetractStream[(String,Long)].print("df")
    //    resultT2.toRetractStream[(String,Long)].print("sql")

    // 3.输出文件
    val outPath = "E:\\Flinklearn\\Flink\\resources\\data1\\output.txt"
    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("temp", DataTypes.BIGINT()))
      .createTemporaryTable("outputTable")

    //
    //    resultT1.insertInto("outputTable")


    env.execute("wacao")

  }

}
